﻿using CqCore.Logging;
using CqCore.Messaging;
using Newtonsoft.Json;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitmqService.Options;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace RabbitmqService
{
    public class RabbitmqPublisher : IMessagePublisher
    {
        readonly RabbitmqPublisherOption options = null;
        readonly ILogRecorder logger = null;
        IConnection connection = null;
        IModel channel = null;
        IBasicProperties properties = null;

        public event EventHandler<BasicReturnEventArgs> MessageRouteError;

        public RabbitmqPublisher(RabbitmqPublisherOption options, ILogRecorder<RabbitmqPublisher> logger)
        {
            this.options = options;
            this.logger = logger;
        }

        private void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e)
        {
            logger.Warn(new EventData
            {
                Type = "RabbitmqConnection",
                Message = "RabbitMQ Connection Shutdown",
                Labels = {
                    ["HostName"] = options.HostName,
                    ["Port"] = $"{ options.Port }",
                    ["ExchangeName"] = options.ExchangeName
                }
            });
        }
        private void Connection_RecoverySucceeded(object sender, EventArgs e)
        {
            logger.Warn(new EventData
            {
                Type = "RabbitmqConnectionRecovery",
                Message = "RabbitMQ Connection Recovery Succeeded",
                Labels = {
                    ["HostName"] = options.HostName,
                    ["Port"] = $"{ options.Port }",
                    ["ExchangeName"] = options.ExchangeName
                }
            });
        }
        private void Connection_ConnectionRecoveryError(object sender, ConnectionRecoveryErrorEventArgs e)
        {
            logger.Warn(new EventData
            {
                Type = "RabbitmqConnectionRecovery",
                Message = "RabbitMQ Connection Recovery Error",
                Labels = {
                    ["HostName"] = options.HostName,
                    ["Port"] = $"{ options.Port }",
                    ["ExchangeName"] = options.ExchangeName
                }
            });
        }

        private void Channel_BasicReturn(object sender, BasicReturnEventArgs e)
        {
            logger.Error(new EventData
            {
                Type = "RabbitmqMessageReturn",
                Message = "RabbitMQ Unroutable Message",
                Labels = {
                    ["HostName"] = options.HostName,
                    ["Port"] = $"{ options.Port }",
                    ["ExchangeName"] = options.ExchangeName,
                    ["Topic"] = e.RoutingKey
                }
            });

            MessageRouteError?.Invoke(sender, e);
        }

        private void StartConnection()
        {
            if (connection != null && connection.IsOpen)
            {
                connection.Close();
            }

            connection = new ConnectionFactory()
            {
                HostName = options.HostName,
                Port = options.Port,
                UserName = options.UserName,
                Password = options.Password,
                RequestedHeartbeat = 10,
                AutomaticRecoveryEnabled = true
            }.CreateConnection();

            connection.ConnectionShutdown += Connection_ConnectionShutdown;
            connection.RecoverySucceeded += Connection_RecoverySucceeded;
            connection.ConnectionRecoveryError += Connection_ConnectionRecoveryError;

            logger.Debug(new EventData
            {
                Type = "RabbitmqConnection",
                Message = "RabbitMQ Connection Successfully",
                Labels = {
                    ["HostName"] = options.HostName,
                    ["Port"] = $"{ options.Port }",
                    ["ExchangeName"] = options.ExchangeName
                }
            });
        }
        private void StartChannel()
        {
            if (channel != null && channel.IsOpen)
            {
                channel.Close();
            }

            StartConnection();

            channel = connection.CreateModel();
            channel.BasicReturn += Channel_BasicReturn;

            properties = channel.CreateBasicProperties();
            properties.Persistent = options.Persistent;

            logger.Debug(new EventData
            {
                Type = "RabbitmqChannel",
                Message = "RabbitMQ Channel Successfully",
                Labels = {
                    ["HostName"] = options.HostName,
                    ["Port"] = $"{ options.Port }",
                    ["ExchangeName"] = options.ExchangeName
                }
            });
        }
        private void Start()
        {
            try
            {
                StartChannel();

                logger.Info(new EventData
                {
                    Type = "RabbitmqStart",
                    Message = "RabbitMQ Start Successfully",
                    Labels = {
                        ["HostName"] = options.HostName,
                    ["Port"] = $"{ options.Port }",
                        ["ExchangeName"] = options.ExchangeName
                    }
                });
            }
            catch (Exception ex)
            {
                logger.Warn(new EventData
                {
                    Type = "RabbitmqConnection",
                    Message = ex.GetBaseException().Message,
                    Labels = {
                        ["HostName"] = options.HostName,
                    ["Port"] = $"{ options.Port }",
                        ["ExchangeName"] = options.ExchangeName
                    }
                });

                Thread.Sleep(TimeSpan.FromSeconds(options.StartRetryInterval));

                Start();
            }
        }

        public Task StartAsync()
        {
            return Task.Run(() => Start());
        }

        private Task<bool> SendMessage<TMessage>(string topic, TMessage message)
        {
            try
            {
                channel.BasicPublish(
                    exchange: options.ExchangeName,
                    routingKey: topic,
                    basicProperties: properties,
                    body: Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message)),
                    //body: MessagePackSerializer.NonGeneric.Serialize(typeof(TMessage), message),
                    mandatory: true
                );

                logger.Debug(new EventData
                {
                    Type = "RabbitmqSendMessage",
                    Message = "RabbitMQ Send Message Completed",
                    Labels = {
                        ["ExchangeName"] = options.ExchangeName,
                        ["Topic"] = topic,
                        ["Message"] = JsonConvert.SerializeObject(message)
                    }
                });

                return Task.FromResult(true);
            }
            catch (Exception ex)
            {
                logger.Error(new EventData
                {
                    Type = "RabbitmqSendMessage",
                    Message = ex.GetBaseException().Message,
                    Labels = {
                        ["ExchangeName"] = options.ExchangeName,
                        ["Topic"] = topic,
                        ["Message"] = JsonConvert.SerializeObject(message)
                    }
                });

                return Task.FromResult(false);
            }
        }

        public async Task PublishAsync<TMessage>(string topic, TMessage message)
        {
                await SendMessage<TMessage>(topic, message);
        }

        public void Dispose()
        {
            channel.Close();
            connection.Close();
        }
    }
}
